package com.lee.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * 生产者
 *
 * @author lihaiqiang
 * @date 2022/1/20
 */
public class Producer {

    /**
     * 主题
     */
    public static final String TOPIC = "mytopic";

    public static void main(String[] args) throws Exception {
        Properties p = new Properties();
        // kafka地址，多个已逗号隔开
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
        // 序列化
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

        try {
            int i = 0;
            while (true) {
                String msg = "Hello," + (i++);
                ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, msg);
                kafkaProducer.send(record);
                System.out.println("消息发送成功:" + msg);
                Thread.sleep(500);
            }
        } finally {
            kafkaProducer.close();
        }
    }
}
